home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Java 1996 August
/
Java - Summer 1996.iso
/
kaffe-0.2
/
kaffe
/
threadCalls.c
< prev
next >
Wrap
C/C++ Source or Header
|
1996-02-18
|
4KB
|
202 lines
/*
* threadCalls.c
* Support for threaded ops which may block (read, write, connect, etc.).
*
* Copyright (c) 1996 Systems Architecture Research Centre,
* City University, London, UK.
*
* See the file "license.terms" for information on usage and redistribution
* of this file, and for a DISCLAIMER OF ALL WARRANTIES.
*
* Written by Tim Wilkinson <tim@sarc.city.ac.uk>, February 1996.
*/
#include <sys/types.h>
#include <errno.h>
#include <assert.h>
#include <sys/socket.h>
#include <sys/time.h>
#include "object.h"
#include "thread.h"
#define TH_READ 0
#define TH_WRITE 1
static int maxFd;
static fd_set readsPending;
static fd_set writesPending;
static thread* readQ[FD_SETSIZE];
static thread* writeQ[FD_SETSIZE];
static struct timeval tm = { 0, 0 };
static void blockOnFile(int, int);
void waitOnEvents(void);
/*
* Threaded create socket.
*/
int
threadedCreateSocket(int af, int type, int proto)
{
int fd = socket(af, type, proto);
/* Make non-blocking and I/O signalling here */
return (fd);
}
/*
* Threaded socket connect.
*/
int
threadedConnect(int fd, struct sockaddr* addr, int len)
{
int r;
r = connect(fd, addr, len);
if (r < 0 & (errno == EINPROGRESS || errno == EALREADY)) {
blockOnFile(fd, TH_READ);
r = 0; /* Assume it's okay when we get released */
}
return (r);
}
/*
* Threaded socket accept.
*/
int
threadedAccept(int fd, struct sockaddr* addr, int* len)
{
int r;
for (;;) {
r = accept(fd, addr, len);
if (r >= 0 || !(errno == EINPROGRESS || errno == EALREADY)) {
break;
}
blockOnFile(fd, TH_READ);
}
return (r);
}
/*
* Read but only if we can.
*/
int
threadedRead(int fd, char* buf, int len)
{
int r;
char* ptr;
ptr = buf;
do {
r = read(fd, ptr, len);
if (r < 0 && errno == EWOULDBLOCK) {
blockOnFile(fd, TH_READ);
continue;
}
ptr += r;
len -= r;
} while (len > 0 && r > 0);
return (ptr - buf);
}
/*
* Write but only if we can.
*/
int
threadedWrite(int fd, char* buf, int len)
{
int r;
char* ptr;
ptr = buf;
do {
r = write(fd, ptr, len);
if (r < 0 && errno == EWOULDBLOCK) {
blockOnFile(fd, TH_WRITE);
continue;
}
ptr += r;
len -= r;
} while (len > 0 && r > 0);
return (ptr - buf);
}
/*
* An attempt to access a file would block, so suspend the thread until
* it will happen.
*/
static
void
blockOnFile(int fd, int op)
{
if (fd > maxFd) {
maxFd = fd;
}
if (op == TH_READ) {
FD_SET(fd, &readsPending);
suspendOnQThread(currentThread, &readQ[fd]);
FD_CLR(fd, &readsPending);
}
else {
FD_SET(fd, &writesPending);
suspendOnQThread(currentThread, &writeQ[fd]);
FD_CLR(fd, &writesPending);
}
}
/*
* Check if some file descriptor or other event to become ready.
* Block if required.
*/
void
checkEvents(bool block)
{
int r;
fd_set rd;
fd_set wr;
thread* tid;
thread* ntid;
int i;
intsDisable();
#if defined(FD_COPY)
FD_COPY(&readsPending, &rd);
FD_COPY(&writesPending, &wr);
#else
bcopy(&readsPending, &rd, sizeof(rd));
bcopy(&writesPending, &wr, sizeof(wr));
#endif
r = select(maxFd, &rd, &wr, 0, (block ? 0 : &tm));
for (i = 0; r > 0 && i < maxFd; i++) {
if (readQ[i] != 0 && FD_ISSET(i, &readsPending)) {
for (tid = readQ[i]; tid != 0; tid = ntid) {
ntid = tid->next;
resumeThread(tid);
}
readQ[i] = 0;
r--;
}
if (writeQ[i] != 0 && FD_ISSET(i, &writesPending)) {
for (tid = writeQ[i]; tid != 0; tid = ntid) {
ntid = tid->next;
resumeThread(tid);
}
writeQ[i] = 0;
r--;
}
}
intsRestore();
}